﻿#include "box_network.hh"
#include "box_channel.hh"
#include "fixed_mem_pool.hh"

#include "klogger/interface/logger.h"

#include "config/box_config.hh"

#include "detail/box_alloc.hh"
#include "detail/lang_impl.hh"

#include "root/rpc_root.h"
#include "root/rpc_stub.h"

#include "util/box_debug.hh"
#include "util/object_pool.hh"
#include "util/os_util.hh"
#include "util/string_util.hh"

#ifndef SB_SDK
#include "loop/http/http_loop.hh"
#endif // !SB_SDK
#include "loop/kcp/kcp_loop.hh"
#include "loop/tcp_loop.hh"
#include "loop/udp/udp_loop.hh"

#include <atomic>
#include <cstring>
#include <unordered_map>

#if defined(_WIN32) || defined(_WIN64)
#pragma comment(lib, "ws2_32.lib")
#endif

namespace kratos {
namespace service {

//
// TODO 确保一些函数，只能在主线程内调用
//

/**
 * 空管道.
 */
static std::shared_ptr<BoxChannel> NullChannel;
/**
 * 在工作线程内访问LoopCore指针, 主线程内不使用
 */
thread_local LoopCore *worker_loop_core_ptr_{nullptr};

BoxNetwork::BoxNetwork() {}

BoxNetwork::~BoxNetwork() {}

auto BoxNetwork::start(LoopCorePtr loop_core_ptr) -> bool {
  if (!is_main_thread()) {
    write_log(lang::LangID::LANG_UNEXPECTED_ERROR, klogger::Logger::FATAL,
              "network", "BoxNetwork::start can only call in main thread");
    return false;
  }
  //
  // 启动网络线程
  // NOTICE 每一个loop一个工作线程
  //
  start_worker(loop_core_ptr);
  // 添加到loop数组
  loop_core_vec_.emplace_back(std::move(loop_core_ptr));
  return true;
}

auto BoxNetwork::start() -> bool {
  if (!is_main_thread()) {
    write_log(lang::LangID::LANG_UNEXPECTED_ERROR, klogger::Logger::FATAL,
              "network", "BoxNetwork::start can only call in main thread");
    return false;
  }
  //
  // 初始化配置重载回调
  //
  static std::once_flag init_config_reload;
  std::call_once(init_config_reload, [this] { on_config_change(); });
  return true;
}

auto BoxNetwork::is_main_thread() -> bool {
  if (main_thr_id_ == std::thread::id()) {
    //
    // update还没有被执行
    //
    return true;
  }
  return (main_thr_id_ == std::this_thread::get_id());
}

auto BoxNetwork::start_internal(const std::string &type) -> LoopCorePtr {
  if (!is_main_thread()) {
    write_log(lang::LangID::LANG_UNEXPECTED_ERROR, klogger::Logger::FATAL,
              "network", "BoxNetwork::start can only call in main thread");
    return nullptr;
  }
  //
  // 检查是否已经建立过此类型的core
  //
  for (auto &core : loop_core_vec_) {
    if (core->type_ == type) {
      return core;
    }
  }
  //
  // 创建并启动loop
  //
  auto loop_ptr = create_and_start_loop(type);
  if (!loop_ptr) {
    return nullptr;
  }
  auto loop_core_ptr = kratos::make_shared_pool_ptr<LoopCore>();
  loop_core_ptr->loop_ = loop_ptr;
  loop_core_ptr->type_ = type;
  if (!start(loop_core_ptr)) {
    return nullptr;
  } else {
    write_log(lang::LangID::LANG_INFO, klogger::Logger::INFORMATION, "network",
              ("Start loop [" + type + "] successfully").c_str());
  }
  return loop_core_ptr;
}

auto BoxNetwork::stop() -> bool {
  if (!is_main_thread()) {
    write_log(lang::LangID::LANG_UNEXPECTED_ERROR, klogger::Logger::FATAL,
              "network", "BoxNetwork::start can only call in main thread");
    return false;
  }
  for (auto &core_ptr : loop_core_vec_) {
    core_ptr->running_ = false;
    while (!core_ptr->thread_exit_) {
      if (core_ptr->thread_.joinable()) {
        core_ptr->thread_.join();
      }
    }
    if (core_ptr->loop_) {
      write_log(lang::LangID::LANG_INFO, klogger::Logger::INFORMATION,
                "network",
                ("Stop loop [" + core_ptr->type_ + "] successfully").c_str());
      core_ptr->loop_->stop();
      core_ptr->loop_ = nullptr;
    }
    // 清理未完成的网络事件
    NetEventData event_data{};
    while (true) {
      auto result = core_ptr->main_queue.try_read(event_data);
      if (!result) {
        break;
      }
      event_data.clear();
    }
  }
  //
  // 手动清理所有管理器, 不依赖析构函数清理
  //
  listener_name_map_.clear();
  loop_core_vec_.clear();
  loop_factory_map_.clear();
  return true;
}

auto BoxNetwork::start_worker(LoopCorePtr loop_core_ptr) -> void {
  channel_recv_buffer_len_ = get_config().get_box_channel_recv_buffer_len();
  loop_core_ptr->running_ = true;
  loop_core_ptr->thread_ = std::thread([this, loop_core_ptr]() {
    //
    // 设置'thread_local'的指针，工作线程后续使用
    //
    worker_loop_core_ptr_ = loop_core_ptr.get();
    // 设置当前线程的退出标志
    loop_core_ptr->thread_exit_ = false;
    while (loop_core_ptr->running_) {
      // 1. 运行网络主循环
      // 2. 处理队列的网络事件
      if (loop_core_ptr->loop_) {
        loop_core_ptr->loop_->worker_update();
      }
      network_thread_processor(loop_core_ptr);
    }
    // 清理网络线程
    worker_cleanup(loop_core_ptr);
  });
}

auto BoxNetwork::worker_cleanup(LoopCorePtr loop_core_ptr) -> void {
  // 清理所有未处理的网络事件
  NetEventData event_data{};
  while (true) {
    auto result = loop_core_ptr->net_queue.try_read(event_data);
    if (!result) {
      break;
    }
    event_data.clear();
  }
  worker_loop_core_ptr_ = nullptr;
  // 线程退出标志
  loop_core_ptr->thread_exit_ = true;
}

auto BoxNetwork::on_config_change() -> void {
  get_config().add_reload_listener(
      "network", [&](const std::string &, const config::BoxConfig &new_config) {
        const auto &old_config = get_config();
        if (old_config.get_box_channel_recv_buffer_len() !=
            new_config.get_box_channel_recv_buffer_len()) {
          channel_recv_buffer_len_ =
              new_config.get_box_channel_recv_buffer_len();
          std::string reason1 =
              "box_channel_recv_buffer_len = " +
              std::to_string(old_config.get_box_channel_recv_buffer_len());
          std::string reason2 =
              "box_channel_recv_buffer_len = " +
              std::to_string(new_config.get_box_channel_recv_buffer_len());
          write_log(lang::LangID::LANG_RELOAD_INFO, klogger::Logger::WARNING,
                    reason1.c_str(), reason2.c_str());
        }
      });
}

auto BoxNetwork::create_loop(const std::string &type) -> LoopPtr {
  auto it = loop_factory_map_.find(type);
  if (it == loop_factory_map_.end()) {
    return nullptr;
  }
  return it->second->create_loop();
}

auto BoxNetwork::create_and_start_loop(const std::string &loop_type)
    -> LoopPtr {
  LoopPtr loop_ptr;
  //
  // 尝试建立外部注册的Loop
  //
  loop_ptr = create_loop(loop_type);
  if (!loop_ptr) {
    //
    // 使用内置的网络循环
    //
    if (loop_type == "tcp") {
      loop_ptr = kratos::make_shared_pool_ptr<kratos::loop::TcpLoop>(this);
    } else if (loop_type == "udp") {
      loop_ptr = kratos::make_shared_pool_ptr<kratos::loop::UdpLoop>(this);
    } else if (loop_type == "kcp") {
      loop_ptr = kratos::make_shared_pool_ptr<kratos::loop::KcpLoop>(this);
    } 
#ifndef SB_SDK
    else if (loop_type == "http") {
      loop_ptr = kratos::make_shared_pool_ptr<kratos::loop::HttpLoop>(this);
    } 
#endif // !SB_SDK
    else {
      loop_ptr = kratos::make_shared_pool_ptr<kratos::loop::TcpLoop>(this);
    }
  }
  if (!loop_ptr) {
    return nullptr;
  }
  if (!loop_ptr->start()) {
    loop_ptr->stop();
    loop_ptr.reset();
    return nullptr;
  }
  return loop_ptr;
}

void BoxNetwork::network_thread_processor(LoopCorePtr loop_core_ptr) {
  while (true) {
    NetEventData event_data{};
    auto result = loop_core_ptr->net_queue.try_read(event_data);
    if (!result) {
      return;
    }
    try {
      if (loop_core_ptr->loop_) {
        loop_core_ptr->loop_->do_worker_event(event_data);
      }
    } catch (std::exception &e) {
      write_log(lang::LangID::LANG_UNEXPECTED_EXCEPTION,
                klogger::Logger::FAILURE, "BoxNetwork",
                util::demangle(typeid(e).name()).c_str(), e.what());
    }
    event_data.clear();
  }
}

auto BoxNetwork::listen_at(const std::string name, const std::string &type,
                           const std::string &host, int port) -> bool {
  if (!is_main_thread()) {
    write_log(lang::LangID::LANG_UNEXPECTED_ERROR, klogger::Logger::FATAL,
              "network", "BoxNetwork::start can only call in main thread");
    return false;
  }
  write_log(lang::LangID::LANG_INFO, klogger::Logger::INFORMATION, "network",
            ("Start listner [" + name + "]").c_str());
  auto ip = util::get_host_ip(host);
  if (!util::is_ip_address(ip) || (port == 0) || (name.empty())) {
    return false;
  }
  auto loop_core_ptr = start_internal(type);
  if (!loop_core_ptr) {
    return false;
  }
  NetEventData event_data{};
  std::string real_name = name;
  if (real_name.empty()) {
    real_name = "unknown";
  }
  event_data.event_id = NetEvent::listen_request;
  event_data.listen_request.name = box_malloc(real_name.size() + 1);
  box_assert(this, !event_data.listen_request.name);

  event_data.listen_request.host = box_malloc(host.size() + 1);
  box_assert(this, !event_data.listen_request.host);

  event_data.listen_request.port = port;
  memset(event_data.listen_request.name, 0, real_name.size() + 1);
  memcpy(event_data.listen_request.name, real_name.c_str(), real_name.size());
  memset(event_data.listen_request.host, 0, host.size() + 1);
  memcpy(event_data.listen_request.host, host.c_str(), host.size());
  // 发送到网络线程
  if (!loop_core_ptr->main_queue.send(event_data)) {
    event_data.clear();
    return false;
  }
  return true;
}

auto BoxNetwork::connect_to(const std::string name, const std::string &type,
                            const std::string &host, int port, int timeout)
    -> bool {
  if (!is_main_thread()) {
    write_log(lang::LangID::LANG_UNEXPECTED_ERROR, klogger::Logger::FATAL,
              "network", "BoxNetwork::start can only call in main thread");
    return false;
  }
  write_log(lang::LangID::LANG_INFO, klogger::Logger::INFORMATION, "network",
            ("Start connector [" + name + "]").c_str());
  auto ip = util::get_host_ip(host);
  if (!util::is_ip_address(ip) || (port == 0) || (name.empty())) {
    return false;
  }
  auto loop_core_ptr = start_internal(type);
  if (!loop_core_ptr) {
    return false;
  }
  NetEventData event_data{};
  event_data.event_id = NetEvent::connect_request;
  event_data.connect_request.host = box_malloc(host.size() + 1);
  box_assert(this, !event_data.connect_request.host);

  event_data.connect_request.name = box_malloc(name.size() + 1);
  box_assert(this, !event_data.connect_request.name);

  event_data.connect_request.port = port;
  event_data.connect_request.timeout = timeout;
  memset(event_data.connect_request.host, 0, host.size() + 1);
  memcpy(event_data.connect_request.host, host.c_str(), host.size());
  memset(event_data.connect_request.name, 0, name.size() + 1);
  memcpy(event_data.connect_request.name, name.c_str(), name.size());
  // 发送到网络线程
  if (!loop_core_ptr->main_queue.send(event_data)) {
    event_data.clear();
    return false;
  }
  return true;
}

auto BoxNetwork::close_channel(std::uint64_t id) -> bool {
  if (!is_main_thread()) {
    write_log(lang::LangID::LANG_UNEXPECTED_ERROR, klogger::Logger::FATAL,
              "network", "BoxNetwork::start can only call in main thread");
    return false;
  }
  return enqueue_close_request(id);
}

auto BoxNetwork::do_event_main(LoopCorePtr loop_core_ptr,
                               NetEventData &event_data) -> void {
  switch (event_data.event_id) {
  case NetEvent::listen_response: {
    auto id = event_data.listen_response.channel_id;
    if (event_data.listen_response.success) {
      auto ptr = make_shared_pool_ptr<BoxChannel>(
          id, this, event_data.listen_response.name);

      box_assert(this, !ptr);

      main_channel_map_[id] = {ptr, &loop_core_ptr->main_queue};
      on_listen(event_data.listen_response.name, true, ptr);
    } else {
      on_listen(event_data.listen_response.name, false, NullChannel);
    }
  } break;
  case NetEvent::accept_notify: {
    auto id = event_data.accept_notify.channel_id;
    auto ptr = make_shared_pool_ptr<BoxChannel>(id, this,
                                                event_data.accept_notify.name);
    //
    // 设置地址
    //
    rpc::Address local_addr;
    rpc::Address peer_addr;
    if (event_data.accept_notify.local_ip) {
      local_addr.ip = event_data.accept_notify.local_ip;
      local_addr.port = event_data.accept_notify.local_port;
    }
    if (event_data.accept_notify.peer_ip) {
      peer_addr.ip = event_data.accept_notify.peer_ip;
      peer_addr.port = event_data.accept_notify.peer_port;
    }
    ptr->set_address(local_addr, peer_addr);

    box_assert(this, !id);
    box_assert(this, !ptr);

    main_channel_map_[id] = {ptr, &loop_core_ptr->main_queue};
    on_accept(ptr);
  } break;
  case NetEvent::close_notify: {
    auto id = event_data.close_notify.channel_id;
    box_assert(this, !id);

    auto it = main_channel_map_.find(id);
    if (it == main_channel_map_.end()) {
      break;
    }
    auto ptr = it->second.channel_ptr;

    box_assert(this, !ptr);
    // 关闭事件处理
    on_close(ptr);
    // 设置关闭标记
    ptr->set_close_flag();
    // 更新主线程管道表
    main_channel_map_.erase(it);
  } break;
  case NetEvent::connect_response: {
    if (event_data.connect_response.success) {
      auto id = event_data.connect_response.channel_id;
      box_assert(this, !id);

      auto ptr = make_shared_pool_ptr<BoxChannel>(
          id, this, event_data.connect_response.name);

      //
      // 设置地址
      //
      rpc::Address local_addr;
      rpc::Address peer_addr;
      if (event_data.connect_response.local_ip) {
        local_addr.ip = event_data.connect_response.local_ip;
        local_addr.port = event_data.connect_response.local_port;
      }
      if (event_data.connect_response.peer_ip) {
        peer_addr.ip = event_data.connect_response.peer_ip;
        peer_addr.port = event_data.connect_response.peer_port;
      }
      ptr->set_address(local_addr, peer_addr);

      box_assert(this, !ptr);

      // 更新主线程管道表
      main_channel_map_[id] = {ptr, &loop_core_ptr->main_queue};
      // 成功
      on_connect(event_data.connect_response.name, true, ptr);
    } else {
      // 失败
      on_connect(event_data.connect_response.name, false, NullChannel);
    }
  } break;
  case NetEvent::recv_data_notify: {
    auto id = event_data.recv_data_notify.channel_id;
    box_assert(this, !id);

    auto it = main_channel_map_.find(id);
    if (it == main_channel_map_.end()) {
      break;
    }
    auto ptr = it->second.channel_ptr;

    box_assert(this, !ptr);

    auto bytes = ptr->write_buffer(event_data.recv_data_notify.data_ptr,
                                   event_data.recv_data_notify.length);
    if (bytes != event_data.recv_data_notify.length) {
      ptr->close();
    } else {
      on_data(ptr);
    }
  } break;
  default:
    break;
  }
}

auto BoxNetwork::update() -> void {
  if (main_thr_id_ == std::thread::id()) {
    //
    // 获取主线程ID
    //
    main_thr_id_ = std::this_thread::get_id();
  }
  NetEventData event_data{};
  for (auto &core : loop_core_vec_) {
    while (core->main_queue.try_read(event_data)) {
      try {
        do_event_main(core, event_data);
      } catch (std::exception &e) {
        write_log(lang::LangID::LANG_UNEXPECTED_EXCEPTION,
                  klogger::Logger::FAILURE, "BoxNetwork",
                  util::demangle(typeid(e).name()).c_str(), e.what());
      }
      event_data.clear();
    }
  }
}

auto BoxNetwork::get_channel(std::uint64_t id) const noexcept
    -> const std::shared_ptr<BoxChannel> & {
  auto it = main_channel_map_.find(id);
  if (it == main_channel_map_.end()) {
    return NullChannel;
  }
  return it->second.channel_ptr;
}

SPSCQueuePair &BoxNetwork::get_net_queue() noexcept {
  return worker_loop_core_ptr_->net_queue;
}

SPSCQueuePair &BoxNetwork::get_main_queue() noexcept {
  return worker_loop_core_ptr_->main_queue;
}

auto BoxNetwork::get_listener_name_map() noexcept -> ListenerNameMap & {
  return listener_name_map_;
}

int BoxNetwork::get_channel_recv_buffer_len() const noexcept {
  return channel_recv_buffer_len_;
}

auto BoxNetwork::register_loop_factory(const std::string &type,
                                       LoopFactoryPtr loop_factory_ptr)
    -> bool {
  if (loop_factory_map_.find(type) != loop_factory_map_.end()) {
    return false;
  }
  loop_factory_map_.emplace(type, loop_factory_ptr);
  return true;
}

auto BoxNetwork::listen_response(const std::string &name,
                                 std::uint64_t channel_id, bool success)
    -> bool {
  NetEventData response{};
  response.listen_response.name = nullptr;
  auto length = static_cast<int>(name.size() + 1);
  response.listen_response.name = box_malloc(length);
  memcpy(response.listen_response.name, name.c_str(), length);
  response.event_id = NetEvent::listen_response;
  if (!success) {
    response.listen_response.channel_id = 0;
    response.listen_response.success = false;
  } else {
    response.listen_response.channel_id = channel_id;
    response.listen_response.success = true;
  }
  if (!get_net_queue().send(response)) {
    // 发送失败，清理资源
    // 发生这种情况，会导致逻辑线程无法得知监听器是否启动成功
    response.clear();
  }
  return true;
}

auto BoxNetwork::accept_notify(const std::string &name,
                               std::uint64_t channel_id,
                               const std::string &local_ip, int local_port,
                               const std::string &peer_ip, int peer_port)
    -> bool {
  NetEventData event_data{};
  event_data.event_id = NetEvent::accept_notify;
  event_data.accept_notify.name = box_malloc(name.size() + 1);
  memcpy(event_data.accept_notify.name, name.c_str(), name.size() + 1);
  event_data.accept_notify.channel_id = channel_id;
  event_data.accept_notify.local_ip = nullptr;
  event_data.accept_notify.local_port = local_port;
  event_data.accept_notify.peer_ip = nullptr;
  event_data.accept_notify.peer_port = peer_port;
  event_data.accept_notify.local_ip = box_malloc(local_ip.size() + 1);
  memcpy(event_data.accept_notify.local_ip, local_ip.c_str(),
         local_ip.size() + 1);
  event_data.accept_notify.peer_ip = box_malloc(peer_ip.size() + 1);
  memcpy(event_data.accept_notify.peer_ip, peer_ip.c_str(), peer_ip.size() + 1);
  if (!get_net_queue().send(event_data)) {
    // 发送失败，清理资源
    // 发生这种情况，会导致逻辑线程无法得知监听器是否启动成功
    event_data.clear();
  }
  return true;
}

auto BoxNetwork::connect_response(const std::string &name,
                                  std::uint64_t channel_id,
                                  const std::string &local_ip, int local_port,
                                  const std::string &peer_ip, int peer_port,
                                  bool success) -> bool {
  NetEventData event_data{};
  event_data.event_id = NetEvent::connect_response;
  event_data.connect_response.success = success;
  event_data.connect_response.name = box_malloc(name.size() + 1);
  memcpy(event_data.connect_response.name, name.c_str(), name.size() + 1);
  event_data.connect_response.channel_id = channel_id;
  event_data.connect_response.local_ip = nullptr;
  event_data.connect_response.local_port = local_port;
  event_data.connect_response.peer_ip = nullptr;
  event_data.connect_response.peer_port = peer_port;
  event_data.connect_response.local_ip = box_malloc(local_ip.size() + 1);
  memcpy(event_data.connect_response.local_ip, local_ip.c_str(),
         local_ip.size() + 1);
  event_data.connect_response.peer_ip = box_malloc(peer_ip.size() + 1);
  memcpy(event_data.connect_response.peer_ip, peer_ip.c_str(),
         peer_ip.size() + 1);
  if (!get_net_queue().send(event_data)) {
    // 发送失败，清理资源
    // 发生这种情况，会导致逻辑线程无法得知监听器是否启动成功
    event_data.clear();
  }
  return true;
}

auto BoxNetwork::recv_data_notify(std::uint64_t channel_id, char *data,
                                  int length) -> bool {
  NetEventData event_data{};
  event_data.event_id = NetEvent::recv_data_notify;
  event_data.recv_data_notify.channel_id = channel_id;
  event_data.recv_data_notify.length = length;
  event_data.recv_data_notify.data_ptr = box_malloc(std::size_t(length));
  memcpy(event_data.recv_data_notify.data_ptr, data, std::size_t(length));
  if (!get_net_queue().send(event_data)) {
    // 发送失败，清理资源
    // 发生这种情况，会导致逻辑线程无法得知监听器是否启动成功
    event_data.clear();
  }
  return true;
}

auto BoxNetwork::close_notify(std::uint64_t channel_id) -> bool {
  NetEventData event_data{};
  event_data.event_id = NetEvent::close_notify;
  event_data.close_notify.channel_id = channel_id;
  if (!get_net_queue().send(event_data)) {
    // 发送失败，清理资源
    // 发生这种情况，会导致逻辑线程无法得知监听器是否启动成功
    event_data.clear();
  }
  return true;
}

auto BoxNetwork::gen_uuid() -> std::uint64_t {
  static std::atomic_uint64_t id = 1;
  return id++;
}

int BoxNetwork::enqueue_send_request(std::uint64_t id, const char *data,
                                     int size) {

  box_assert(this, !id);
  box_assert(this, !data);
  box_assert(this, !size);

  auto it = main_channel_map_.find(id);
  if (it == main_channel_map_.end()) {
    return 0;
  }
  box_assert(this, !it->second.main_queue_ptr);

  NetEventData event{};
  event.event_id = NetEvent::send_data_notify;
  event.send_data_notify.channel_id = id;
  event.send_data_notify.data_ptr = box_malloc(size);
  box_assert(this, !event.send_data_notify.data_ptr);

  event.send_data_notify.length = size;
  memcpy(event.send_data_notify.data_ptr, data, size);

  // 发送到网络线程
  if (!it->second.main_queue_ptr->send(event)) {
    event.clear();
    return 0;
  }
  return size;
}

bool BoxNetwork::enqueue_close_request(std::uint64_t id) {
  box_assert(this, !id);

  auto it = main_channel_map_.find(id);
  if (it == main_channel_map_.end()) {
    return 0;
  }
  box_assert(this, !it->second.main_queue_ptr);

  NetEventData event{};
  event.event_id = NetEvent::close_request;
  event.close_request.channel_id = id;
  return it->second.main_queue_ptr->send(event);
}

} // namespace service
} // namespace kratos
